Skip to content

feat(ingestion): introduce TagRegistry domain layer#27991

Open
IceS2 wants to merge 8 commits intomainfrom
feature/tags-registry-refactor
Open

feat(ingestion): introduce TagRegistry domain layer#27991
IceS2 wants to merge 8 commits intomainfrom
feature/tags-registry-refactor

Conversation

@IceS2
Copy link
Copy Markdown
Contributor

@IceS2 IceS2 commented May 8, 2026

Fixes open-metadata/openmetadata-collate#3999

Describe your changes:

Why

The legacy context.tags list overloads two roles (sink-bound queue + per-entity inheritance lookup) and stores one Pydantic TagLabel per attachment. On a Snowflake schema with ~120k tag attachments this peaks at ~112 MB — disproportionate to the ~20 unique tags actually in play.

What

Adds metadata.domain.tags with two classes:

  • TagRegistry — per-Source bookkeeping for tag/classification ingestion
  • TagCanonicalizer — case-corrected name resolution against system-provider entities in OM

Migrates the Snowflake connector to use them. Other DB connectors continue on the legacy context.tags flow (strangler pattern; future PRs migrate per-connector).

How

  • TagRegistry interns shared TagLabel instances by (classification, tag, labelType, state) — N attachments dereference to the same Pydantic graph.
  • clear_scope rebinds _labels_by_entity to a fresh dict (not pop-in-place) so the hash-table bucket array releases immediately.

Type of change:

  • Improvement

High-level design:

Architecture

metadata.domain.tags is a new domain layer (in-memory helpers operating on OM-generated types, no I/O ownership beyond an injected OpenMetadata client for read-only queries). Two classes by separated concerns:

  • TagRegistry — three storage groups behind two locks. _pending queues sink-bound OMetaTagAndClassification payloads (deduped by _known_tag_fqns). _labels_by_entity maps entity FQN → list of shared TagLabel references. _tag_label_cache interns the underlying TagLabel objects.
  • TagCanonicalizer — separate concern: case-corrected lookup against system-provider classifications/tags via ES with retry. Failures raise; callers wrap in Either.

DatabaseServiceSource exposes both as cached_property and wires clear_schema_tag_scope / clear_database_tag_scope topology hooks.

Files added

  • ingestion/src/metadata/domain/__init__.py
  • ingestion/src/metadata/domain/tags/__init__.py
  • ingestion/src/metadata/domain/tags/registry.py
  • ingestion/src/metadata/domain/tags/canonicalizer.py
  • ingestion/tests/unit/domain/tags/test_registry.py (34 tests)
  • ingestion/tests/unit/domain/tags/test_canonicalizer.py (13 tests)

Files updated

  • ingestion/src/metadata/ingestion/source/database/database_service.py — registry/canonicalizer wiring + topology hooks
  • ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py — migrated `yield_tag` and `get_*_tag_labels` overrides
  • ingestion/setup.py — added tenacity>=8.0,<10 for canonicalizer retries

Tests:

Unit tests

  • 34 tests in test_registry.py (attach/labels_for/drain/clear_scope/ensure_known/stats, thread safety, interning identity by `is`-equality, cache survives `clear_scope`, fqn-level dedup across `label_type` variants)
  • 13 tests in test_canonicalizer.py (canonicalization, retry behavior, no cache poisoning on persistent failure)

Manual testing performed

  1. Local Collate via `docker compose -f docker/development/docker-compose.yml up -d`
  2. Snowflake ingestion against a perf fixture with ~120k tag attachments on a single schema → peak heap ~112 MB → ~24 MB (~4.7× reduction)
  3. Multi-DB ingestion (5 perf databases × 39 schemas with tags) → per-scope peak ≤ 0.24 MB, every `clear_scope` releases bucket memory
  4. Behavioral verification of inheritance + dedup on `OM_PERF_MAIN.NORMAL_011.T_0001` (schema + table + column tag levels)

UI screen recording / screenshots:

Not applicable.

Checklist:

  • I have read the CONTRIBUTING document.
  • My PR title is descriptive
  • My PR is linked to a GitHub issue
  • I have added tests (unit) and listed them above.

Summary by Gitar

  • Refactored TagRegistry contracts:
    • Updated TagRegistry.attach and _build_pending_record to require explicit description strings instead of optional values.
  • Enhanced TagCanonicalizer interface:
    • Changed classification and tag methods to use explicit default_description arguments for seeding metadata.
    • Enforced non-optional descriptions in Canonical named tuple to ensure data consistency.
  • Updated Snowflake connector:
    • Refactored yield_tag and yield_database_tag calls to provide required default descriptions for classification and tag lookups.
  • Updated unit tests:
    • Adjusted test_canonicalizer and test_snowflake tests to match the new strict parameter signatures.

This will update automatically on new commits.

Adds metadata.domain.tags with TagRegistry (per-Source bookkeeping) and
TagCanonicalizer (case-corrected name resolution against OM). Migrates
the Snowflake connector to the new architecture; other connectors stay
on the legacy context.tags flow (strangler pattern).

TagRegistry interns shared TagLabel instances by (classification, tag,
label_type, state) and rebinds the per-entity dict on scope clear. On a
schema with ~120k tag attachments and 21 unique tags, peak heap drops
from ~112 MB to ~24 MB.

Public get_*_tag_labels methods on the database service base are
unchanged; non-Snowflake DB connectors are not touched.
Copilot AI review requested due to automatic review settings May 8, 2026 12:37
@IceS2 IceS2 requested a review from a team as a code owner May 8, 2026 12:37
@github-actions github-actions Bot added Ingestion safe to test Add this label to run secure Github workflows on PRs labels May 8, 2026
Comment thread ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py Outdated
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a new ingestion “domain” layer for tags/classifications to reduce memory usage by interning TagLabel instances and separating canonicalization (case correction) from bookkeeping. It wires the new TagRegistry/TagCanonicalizer into the database source base and migrates the Snowflake connector to use the new flow, with added unit test coverage and a new tenacity dependency for retry behavior.

Changes:

  • Added metadata.domain.tags with TagRegistry (interning + scope clearing) and TagCanonicalizer (system-provider case-corrected resolution with retries).
  • Wired registry/canonicalizer into DatabaseServiceSource and migrated Snowflake tag ingestion + tag-label lookup to use the registry.
  • Added unit tests for the new domain layer and added tenacity to ingestion dependencies.

Reviewed changes

Copilot reviewed 9 out of 11 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
ingestion/src/metadata/domain/init.py Defines the new domain-layer package and its intended boundaries.
ingestion/src/metadata/domain/tags/init.py Exposes the tags domain API (TagRegistry, TagCanonicalizer, etc.).
ingestion/src/metadata/domain/tags/registry.py Implements tag payload dedup + TagLabel interning + per-scope label storage/clearing.
ingestion/src/metadata/domain/tags/canonicalizer.py Implements ES-backed canonicalization with retry + caching.
ingestion/src/metadata/ingestion/source/database/database_service.py Adds per-source registry/canonicalizer and topology hooks intended to clear per-scope tag state.
ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py Migrates Snowflake tag ingestion and tag-label lookup/inheritance to the new registry/canonicalizer.
ingestion/tests/unit/domain/init.py Test package marker for the new unit-test module path.
ingestion/tests/unit/domain/tags/init.py Test package marker for tags domain unit tests.
ingestion/tests/unit/domain/tags/test_registry.py Unit tests validating registry semantics, interning, and concurrency behavior.
ingestion/tests/unit/domain/tags/test_canonicalizer.py Unit tests validating canonicalization, caching, and retry behavior.
ingestion/setup.py Adds tenacity runtime dependency for canonicalizer retries.

Comment on lines +165 to +180
def clear_scope(self, scope_fqn: str) -> None:
"""Drop labels under ``scope_fqn`` and mark the scope cleared.

Subsequent ``attach`` calls for this scope will raise.
"""
prefix = scope_fqn + fqn.FQN_SEPARATOR

with self._scope_state_lock:
kept = {k: v for k, v in self._labels_by_entity.items() if k != scope_fqn and not k.startswith(prefix)}
dropped = len(self._labels_by_entity) - len(kept)
self._labels_by_entity = kept
if dropped:
logger.debug("TagRegistry: cleared scope %s (%d entity labels dropped)", scope_fqn, dropped)

with self._run_state_lock:
self._cleared_scopes.add(scope_fqn)
Comment on lines +620 to +649
def yield_database_tag(self, database_entity: str) -> Iterable[Either[OMetaTagAndClassification]]:
"""Yield database-level tags for the topology."""
if not self.source_config.includeTags:
return

if database_entity in self.database_tags_map:
database_fqn = fqn.build(
self.metadata,
entity_type=Database,
service_name=self.context.get().database_service,
database_name=database_entity,
)
for tag_info in self.database_tags_map[database_entity]:
yield from get_ometa_tag_and_classification(
tag_fqn=FullyQualifiedEntityName(database_fqn),
tags=[tag_info["tag_value"]],
classification_name=tag_info["tag_name"],
tag_description=SNOWFLAKE_TAG_DESCRIPTION,
classification_description=SNOWFLAKE_CLASSIFICATION_DESCRIPTION,
metadata=self.metadata,
system_tags=True,
if database_entity not in self.database_tags_map:
return

database_fqn = fqn.build(
self.metadata,
entity_type=Database,
service_name=self.context.get().database_service,
database_name=database_entity,
)
for tag_info in self.database_tags_map[database_entity]:
try:
classification = self.tag_canonicalizer.classification(
tag_info["tag_name"], SNOWFLAKE_CLASSIFICATION_DESCRIPTION
)
tag = self.tag_canonicalizer.tag(classification.name, tag_info["tag_value"], SNOWFLAKE_TAG_DESCRIPTION)

self.tags_registry.attach(
scope_fqn=database_fqn,
entity_fqn=database_fqn,
classification_name=classification.name,
tag_name=tag.name,
classification_description=classification.description,
tag_description=tag.description,
)
except Exception as exc:
from metadata.domain.tags.canonicalizer import Canonical, TagCanonicalizer
from metadata.domain.tags.registry import ScopeAlreadyClearedError, TagRegistry

__all__ = [
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - i dont like this because then people like to *

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is way handy that having too deep imports imho, we have static checks to prevent import *

logger = ingestion_logger()


_es_retry = retry(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking this could be more commonly reused in other places other than the canolicalaizer, e.g., when we search for tables, or owners

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, but did not want to add it there for now. If this works properly, I will need to create another PR to migrate the rest of the databases to use this and the other service types, while also doing some cleaning. For sure this should be moved there.

return canonical

@_es_retry
def _es_search(self, entity_type: Any, search_string: str) -> Iterable[Any]:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could be inside ometa with the decorator and all

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, but did not want to add it there for now. If this works properly, I will need to create another PR to migrate the rest of the databases to use this and the other service types, while also doing some cleaning. For sure this should be moved there.

error=f"Tag canonicalization failed for {tag_info['tag_name']}.{tag_info['tag_value']}: {exc}",
)
)
yield from (Either(right=record) for record in self.tags_registry.drain())
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dont follow why the drain() here helps vs the vanilla yield we had

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not about the drain itself.
The gains are on how we populate and what we yield.

Before we were yielding the following:

                for tag_info in self.schema_tags_map[schema_name]:
                    yield from get_ometa_tag_and_classification(
                        tag_fqn=FullyQualifiedEntityName(schema_fqn),
                        tags=[tag_info["tag_value"]],
                        classification_name=tag_info["tag_name"],
                        tag_description=SNOWFLAKE_TAG_DESCRIPTION,
                        classification_description=SNOWFLAKE_CLASSIFICATION_DESCRIPTION,
                        metadata=self.metadata,
                        system_tags=True,
                    )

This was yielding multiple times the same tag and classification to be ingested since we are saving basically Tag References.
If Tag A was used one time at a schema, one time at a database and one time at a column, we would send 3 puts on that Tag and Classification.

Drain comes from pending tags that have not yet been yielded to the sink, based on this logic that should avoid us sending duplicates (just classification ones due to the current coupling between both, but I did not want to delve there due to the urgency)

        with self._run_state_lock:
            tag_fqn = model_str(tag_label.tagFQN)
            if tag_fqn not in self._known_tag_fqns:
                self._known_tag_fqns.add(tag_fqn)
                self._pending.append(
                    self._build_pending_record(
                        classification_name=classification_name,
                        classification_description=classification_description,
                        tag_name=tag_name,
                        tag_description=tag_description,
                    )
                )

This is geared towards time improvement, memory improvement is handled within the TagRegistry as well, avoiding creating a lot of repeated pydantic TagLabel objects by using references to them instead.

All this while centralizing the Tag logic in one place with a clear api.

schema_name=schema_name,
)
self.tags_registry.clear_scope(schema_fqn)
yield from ()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

silly q, why this yield?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to how the post process actually works in the Topology Runner:
yield from self._run_node_post_process(node=node)

- Drop None defaults from `TagRegistry.attach()` description params
  (required `str | None`); normalize None -> "" inside
  `_build_pending_record` so the OM schema's required-non-null Markdown
  contract is owned at the registry boundary, not at every caller.
- Fix `yield_database_tag` missing registry drain (PR review).
- Fix `clear_scope` lock-order race that left labels visible after the
  scope was marked cleared (PR review).
- Resolve basedpyright errors:
  - `Either` and `TopologyNode` use Form 3 Pydantic v2 fields
    (`Annotated[X, Field(...)] = default`) so static checkers see the
    defaults.
  - `cast("str", fqn.build(...))` at the 13 sites that feed
    `fqn.build(...)` results into FQN-typed args.
  - Scoped `# pyright: ignore[reportAttributeAccessIssue]` on
    `TopologyContext` dynamic-attribute accesses (matches the codebase
    pattern of grandfathering 8.4k+ such errors via baseline).
- Populate `stackTrace` on the three snowflake tag-error
  `StackTraceError`s so the Status UI shows the trace, not just the
  one-line summary.
- Rewrite three snowflake tag-inheritance tests to drive the real
  registry attach + inheritance walk after `get_tag_label` was removed:
  - `test_schema_tag_inheritance`
  - `test_database_tag_inheritance`
  - `test_tag_value_precedence` (one attach intentionally passes
    `None` descriptions to exercise the registry's normalization)
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 9, 2026

🟡 Playwright Results — all passed (18 flaky)

✅ 4067 passed · ❌ 0 failed · 🟡 18 flaky · ⏭️ 86 skipped

Shard Passed Failed Flaky Skipped
🟡 Shard 1 298 0 1 4
🟡 Shard 2 761 0 7 8
🟡 Shard 3 778 0 3 7
✅ Shard 4 790 0 0 18
🟡 Shard 5 707 0 2 41
🟡 Shard 6 733 0 5 8
🟡 18 flaky test(s) (passed on retry)
  • Features/CustomizeDetailPage.spec.ts › Domain - customization should work (shard 1, 1 retry)
  • Features/ActivityAPI.spec.ts › Activity event shows the actor who made the change (shard 2, 1 retry)
  • Features/BulkEditEntity.spec.ts › Glossary (shard 2, 1 retry)
  • Features/ColumnBulkOperations.spec.ts › should expand STRUCT column to show nested fields (shard 2, 1 retry)
  • Features/DataQuality/BundleSuiteBulkOperations.spec.ts › Add test case to existing Bundle Suite (shard 2, 1 retry)
  • Features/KnowledgeCenter.spec.ts › Article mentions in description should working for Knowledge Center (shard 2, 1 retry)
  • Features/KnowledgeCenterTextEditor.spec.ts › Rich Text Editor - Text Formatting (shard 2, 1 retry)
  • Features/KnowledgeCenterTextEditor.spec.ts › Rich Text Editor - Text Formatting (shard 2, 1 retry)
  • Features/RTL.spec.ts › Verify Following widget functionality (shard 3, 1 retry)
  • Flow/PersonaDeletionUserProfile.spec.ts › User profile loads correctly before and after persona deletion (shard 3, 1 retry)
  • Flow/PersonaFlow.spec.ts › Set default persona for team should work properly (shard 3, 1 retry)
  • Pages/EntityDataSteward.spec.ts › User as Owner Add, Update and Remove (shard 5, 1 retry)
  • Pages/ExplorePageRightPanel_KnowledgeCenter.spec.ts › Should remove user owner for knowledgeCenter (shard 5, 1 retry)
  • Pages/Glossary.spec.ts › Async Delete - WebSocket failure triggers recovery (shard 6, 1 retry)
  • Pages/GlossaryImportExport.spec.ts › Glossary CSV import preserves typed relations (shard 6, 1 retry)
  • Pages/Lineage/DataAssetLineage.spec.ts › Column lineage for dashboardDataModel -> container (shard 6, 1 retry)
  • Pages/Lineage/LineageFilters.spec.ts › Verify Impact Analysis service filter selection (shard 6, 1 retry)
  • Pages/Users.spec.ts › User should have only view permission for glossary and tags for Data Consumer (shard 6, 1 retry)

📦 Download artifacts

How to debug locally
# Download playwright-test-results-<shard> artifact and unzip
npx playwright show-trace path/to/trace.zip    # view trace

…cascade

Reverting `Either.left`/`Either.right` to the original
`Annotated[Optional[T], Field(default=None, ...)]` form. The Form 3
shape (`Annotated[T | None, Field(...)] = None`) introduced in the
prior commit caused pyright to eagerly bind `T` from the literal-None
default at every no-arg construction site like `Either(left=...)`,
resolving them to `Either[Unknown]`. Because `Either[T]` is invariant,
those failed to satisfy declared generator return types like
`Iterable[Either[CreateTableRequest]]` — surfacing 45 latent
reportReturnType errors across sample_data, dbt, sas, qliksense, sigma,
common_db_source, common_broker_source, amundsen, sink/metadata_rest.

Form 2 wraps the default inside `Annotated` metadata where pyright
treats it as opaque: it sees "this field has a default" (so construction
sites pass) but doesn't eagerly bind `T` from None. Context-driven
inference works, no cascade.

`TopologyNode` stays in Form 3 — its fields are concrete-typed
(`list[str] | None`, `bool`), no Generic-T to bind.
Copilot AI review requested due to automatic review settings May 9, 2026 08:49
Comment thread ingestion/src/metadata/domain/tags/registry.py Outdated
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 11 out of 13 changed files in this pull request and generated 3 comments.

Comment on lines +231 to +239
@cached_property
def tags_registry(self) -> TagRegistry:
"""Per-Source registry tracking tag/classification ingestion state."""
return TagRegistry(metadata=self.metadata)

@cached_property
def tag_canonicalizer(self) -> TagCanonicalizer:
"""Per-Source canonicalizer for case-corrected tag/classification names."""
return TagCanonicalizer(metadata=self.metadata)
Comment on lines +135 to +138
with self._scope_state_lock:
self._labels_by_entity.setdefault(entity_fqn, []).append(tag_label)

with self._run_state_lock:
Comment on lines +24 to +25
"""Skip tenacity's between-retry sleeps so retry-tests run instantly."""
monkeypatch.setattr("tenacity.nap.time.sleep", lambda *_args, **_kwargs: None)
…-check

Two refinements from a fresh round of Copilot review:

1. **Merge ``_run_state_lock`` and ``_scope_state_lock`` into a single
   ``_lock``.** Two locks were a smell: every ``attach()`` already needed
   both, the lock-acquisition order varied across methods (``attach`` does
   one order, ``stats`` the reverse — fragile if any future code held both
   simultaneously), and ``RLock`` was used defensively without any actual
   re-entry. One lock = single mental model, no ordering invariant, deadlock
   impossible.

2. **Move ``_intern_tag_label`` invocation inside the cleared-scope
   gate** (Copilot review). With one lock, the cleared-check, intern,
   label-append, and pending-update all happen in one atomic critical
   section — so when a scope is already cleared, no TagLabel is interned
   into ``_tag_label_cache``. Verified deterministically: 30 threads
   attaching to a pre-cleared scope all raise and the cache stays empty.

3. Reword the inline ``vars(self).setdefault`` thread-safety comment in
   ``database_service.py`` to drop the misleading Python 3.14 reference
   (project targets 3.10+) — just point at the official
   ``threadsafety.html`` doc.
Copilot AI review requested due to automatic review settings May 10, 2026 07:59
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 11 out of 13 changed files in this pull request and generated 3 comments.

Comment on lines +69 to +93
def classification(
self,
name: str,
description: str | None = None,
) -> Canonical:
"""Return canonical classification name + description from OM, cached."""
key = name.lower()
with self._lock:
cached = self._classification_cache.get(key)
if cached is not None:
return cached

results = self._es_search(Classification, name)
canonical = Canonical(name=name, description=description)
for entity in results:
if entity.provider == ProviderType.system and entity.name.root.lower() == key:
canonical = Canonical(
name=entity.name.root,
description=entity.description.root if entity.description else description,
)
break

with self._lock:
self._classification_cache.setdefault(key, canonical)
return canonical
tag_request=CreateTagRequest( # pyright: ignore[reportCallIssue]
classification=FullyQualifiedEntityName(classification_name),
name=EntityName(tag_name),
description=Markdown(tag_description or ""),
Comment on lines +572 to +594
entity_fqn = fqn._build(self.context.get().database_service, *fqn_elements) # pyright: ignore[reportAttributeAccessIssue]
try:
classification = self.tag_canonicalizer.classification(row[0], SNOWFLAKE_CLASSIFICATION_DESCRIPTION)
tag = self.tag_canonicalizer.tag(classification.name, row[1], SNOWFLAKE_TAG_DESCRIPTION)

self.tags_registry.attach(
scope_fqn=schema_fqn,
entity_fqn=entity_fqn,
classification_name=classification.name,
tag_name=tag.name,
classification_description=classification.description,
tag_description=tag.description,
)
except Exception as exc:
logger.debug(traceback.format_exc())
yield Either(
left=StackTraceError(
name=f"{row[0]}.{row[1]}",
error=f"Tag canonicalization failed for {row[0]}.{row[1]}: {exc}",
stackTrace=traceback.format_exc(),
),
right=None,
)
…er params

End-to-end tightening to prevent the empty-description / overwrite path
flagged in PR review. Three coordinated changes:

1. **``Canonical.description`` is now ``str`` (required)** instead of
   ``str | None``. The canonicalizer always seeds with the caller-provided
   default and only overrides with a non-empty server value, so the
   resolved description is invariably a real string. Removing the
   Optional makes that invariant visible at the type level.

2. **Rename canonicalizer parameters to make their fallback semantics
   honest**:
   - ``classification(name, description)`` →
     ``classification(name, default_description)``
   - ``tag(classification_name, tag_name, tag_description)`` →
     ``tag(classification_name, tag_name, default_tag_description)``

   The parameter is used to seed the Canonical when no system match
   exists in OM, and as a fallback when an OM match has an empty
   description — never as the value to set on an existing entity. The
   rename signals "if I have to invent this, here's what to write down"
   rather than "set the description to this." Snowflake call sites use
   the new kwargs explicitly for self-documenting reads.

3. **Tighten ``TagRegistry.attach`` and ``_build_pending_record`` to
   required ``str`` for both descriptions** (was ``str | None``). With
   ``Canonical.description`` now ``str``, every caller can pass through
   without an ``or ""`` shim. ``_build_pending_record`` drops the
   defensive ``Markdown(x or "")`` for clean ``Markdown(x)``.

Net effect: no possible code path sends ``None`` or relies on an
``or ""`` fallback that would clobber a server-side description. The
schema's required-Markdown contract is enforced at every layer above
the wire.
@gitar-bot
Copy link
Copy Markdown

gitar-bot Bot commented May 10, 2026

Code Review ✅ Approved 3 resolved / 3 findings

Introduces a TagRegistry domain layer to optimize memory usage and provides robust tag canonicalization for the Snowflake connector. Resolved critical registry drainage and race condition issues during the review.

✅ 3 resolved
Bug: yield_database_tag never drains registry — tags lost

📄 ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py:620-634
In yield_database_tag, tags are attached to the registry via self.tags_registry.attach(...) but there is no corresponding yield from (Either(right=record) for record in self.tags_registry.drain()) at the end of the method. This means the OMetaTagAndClassification payloads for database-level tags are queued but never yielded to the topology/sink for creation in OpenMetadata.

By contrast, yield_tag (schema-level) correctly calls drain() at line 618. Without this, database-level classifications and tags will never be created server-side (though their labels will still be attached to entities via inheritance, referencing non-existent tags).

Edge Case: TOCTOU race between _run_state_lock and _scope_state_lock in attach

📄 ingestion/src/metadata/domain/tags/registry.py:122-136
In TagRegistry.attach, the check for _cleared_scopes (line 122-126 under _run_state_lock) and the mutation of _labels_by_entity (line 135-136 under _scope_state_lock) are not atomic. A concurrent clear_scope call could interleave between these two critical sections: Thread A passes the cleared-scope guard, Thread B calls clear_scope (marks the scope cleared and rebuilds _labels_by_entity), Thread A then appends a label into the freshly rebuilt dict — effectively inserting a label into a scope that was just cleared.

In the current topology model this is unlikely to trigger because clear_scope runs in post_process after all attach calls for that scope have finished. However, the class advertises itself as "safe for concurrent use across the topology's parallel schema workers" (docstring line 21), which makes this a latent correctness gap if threading assumptions change.

Quality: Comment links to non-existent Python docs page

📄 ingestion/src/metadata/ingestion/source/database/database_service.py:231
The comment references https://docs.python.org/3/library/threadsafety.html which does not exist in the CPython documentation. The relevant guarantees are discussed in the threading module docs or PEP 703. A broken link misleads future readers looking for the safety justification.

Options

Display: compact → Showing less information.

Comment with these commands to change:

Compact
gitar display:verbose         

Was this helpful? React with 👍 / 👎 | Gitar

@sonarqubecloud
Copy link
Copy Markdown

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Ingestion safe to test Add this label to run secure Github workflows on PRs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants